package com.gitee.jastee.kafka.consumer;

import cn.hutool.core.date.DateUtil;
import com.gitee.jastee.util.ParameterTool;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static com.gitee.jastee.kafka.constant.KafkaConstants.KAFKA_PREFIX;

/** Kafka 消费端连接 @Author jast @Date 2020/4/19 下午2:22 @Version 1.0 */
public class KafkaConsumerClient {

    /**
     * 消费数据 使用
     *
     * @return KafkaConsumer<String,String>
     * @return
     */
    public KafkaConsumer<String, String> createConsumer(ParameterTool parameterTool) {
        // String topic, String group, int max_poll_records, boolean isLatest
        Properties props = new Properties();

        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                parameterTool.getRequired(KAFKA_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
        //        props.put("zookeeper.session.timeout.ms", "4000");
        //        props.put("zookeeper.sync.time.ms", "200");
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                parameterTool.get(
                        KAFKA_PREFIX + ConsumerConfig.GROUP_ID_CONFIG,
                        DateUtil.format(DateUtil.date(), "yyyyMMdd")));
        // 控制每次poll的数量
        props.put(
                ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
                parameterTool.getInt(KAFKA_PREFIX + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000));
        // 连接超时
        props.put(
                ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
                parameterTool.getInt(
                        KAFKA_PREFIX + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 90000));
        // 请求超时
        props.put(
                ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
                parameterTool.getInt(
                        KAFKA_PREFIX + ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100000));
        props.put(
                ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
                parameterTool.getInt(
                        KAFKA_PREFIX + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 90000));
        // 自动提交 false
        props.put(
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                parameterTool.getBoolean(
                        KAFKA_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true));
        props.put(
                ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
                parameterTool.getInt(
                        KAFKA_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000));
        props.put(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                parameterTool.get(
                        KAFKA_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));

        props.put(
                ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
                parameterTool.getInt(
                        KAFKA_PREFIX + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
                        Integer.MAX_VALUE));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(
                ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
                parameterTool.getInt(
                        KAFKA_PREFIX + ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Integer.MAX_VALUE));

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(parameterTool.getRequired("kafka.topic")));
        return consumer;
    }

    public static void main(String[] args) throws InterruptedException {
        // KafkaConsumer<String, String> consumer =
        //        new KafkaConsumerClient().createConsumer("userChange", "group_id5", 1, false);
        // while (true) {
        //    ConsumerRecords<String, String> poll = consumer.poll(1000);
        //    for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
        //        String value = stringStringConsumerRecord.value();
        //        String key = stringStringConsumerRecord.key();
        //        System.out.println("key:" + key + ",value:" + value);
        //    }
        TimeUnit.SECONDS.sleep(3);
        // System.out.println("next");
        // }
    }
}
